Pika 线程模型简介
Introduction
Pika是Qihoo360 开源的一款兼容redis协议的高性能kv存储服务,其与redis最大的不用点是其数据是基于磁盘而不是基于内存。同时使用了多线程的方式提高了读写效率。更多的整体设计和实践细节参见Wiki的设计实现板块。这里是更多做代码层面分析,适合想要从事Pika开发的同学。
Pika使用了同样是Qihoo360 开源的Pink 网络库,如果要了解pika的代 码,首先要了解其网络库的网络模型,和调用关系,下面我们来看一下Pink的网络模型。
线程模型
一切从Thread 开始
class Thread {
int Thread::StartThread() {
return pthread_create(&thread_id_, nullptr, RunThread, (void *)this);
}
void* Thread::RunThread(void *arg) {
// thread 的执行函数
thread->ThreadMain()
}
virtual void *ThreadMain() = 0;
}
Tread 类只是对pthread 调用进行了一层封装,值得注意的是ThreadMain 是各线程的入口函数。
ServerThread
class ServerThread : public Thread {
const ServerHandle *handle_;
void ServerThread::DoCronTask();
void ServerThread::ProcessNotifyEvents(const PinkFiredEvent* pfe);
virtual void HandleConnEvent(PinkFiredEvent *pfe) = 0;
virtual void HandleNewConn(int connfd, const std::string& ip_port) = 0;
void *ServerThread::ThreadMain() {
while (!should_stop()) {
DoCronTask();
// epoll_wait
nfds = pink_epoll_->PinkPoll(timeout);
for (int i = 0; i < nfds; i++) {
if (pfe->fd == pink_epoll_->notify_receive_fd()) {
ProcessNotifyEvents(pfe);
continue;
}
if (/*is listening fd*/) {
connfd = accept(fd, (struct sockaddr *) &cliaddr, &clilen);
handle_->AccessHandle(ip_port);
/*
* Handle new connection,
* implemented in derived class
*/
HandleNewConn(connfd, ip_port);
} else {
/*
* Handle connection's event
* implemented in derived class
*/
HandleConnEvent(pfe);
}
}
}
}
}
ServerThread类主要提供一个server的框架,也是一个虚类,衍生类需要实现HandleConnEvent HandleNewConn 等函数。
1,其中ServerHandle 是各种事件发生之后的回调函数,这个类由使用者实现并传入serverthread,各类事件(例如链接关闭)发生时,serverthread 会调用相应的回调函数通知serverthread 的使用者。ServerHandle包含的函数有处理连接超时的FdTimeoutHandle,处理连接被关闭的FdClosedHandle 等等。
class ServerHandle {
virtual void FdTimeoutHandle(int fd, const std::string& ip_port) const {
UNUSED(fd);
UNUSED(ip_port);
}
virtual void FdClosedHandle(int fd, const std::string& ip_port) const {
UNUSED(fd);
UNUSED(ip_port);
}
...
}
2,ProcessNotifyEvents 函数主要是serverthread处理异步通信用的,其他线程可以通过写入PinkEpoll 的notify_send_fd(具体的消息内容存到了notify_queue里面),在下一次serverthread epoll 循环的时候就可以通过notify_receive_fd读到有消息要处理,从notify_queue读出消息,处理其他线程的事件。
class PinkEpoll {
// fds is pipe
notify_receive_fd_ = fds[0];
notify_send_fd_ = fds[1];
std::queue<PinkItem> notify_queue_;
}
DispatchThread
DispatchThread维护一组worker thread,worker thread 用来处理客户端连接的读写,DispatchThread主要负责accept 客户端socket连接,然后通过worker线程的pink_epoll 通知worker线程。DispatchThread继承于ServerThread只是实现了HandleNewConn 用于处理新连接。
class DispatchThread : public ServerThread {
void DispatchThread::HandleNewConn() {
// schedule this conn to one of worker
PinkItem ti(connfd, ip_port, kNotiConnect);
std::queue<PinkItem> *q
= &(worker_thread_[next_thread]->pink_epoll()->notify_queue_);
q->push(ti);
}
}
// 其本质是一个 server 线程的缩写版本 只处理conn 的读写
class WorkerThread : public Thread {
// pink_epoll 为了Dispatch 线程accept 之后可以通知给worker线程
PinkEpoll *pink_epoll_;
void *WorkerThread::ThreadMain() {
while(!should_stop) {
nfds = pink_epoll_->PinkPoll(timeout);
for (int i = 0; i < nfds; i++) {
if (pfe->fd == pink_epoll_->notify_receive_fd()) {
if (ti.notify_type() == kNotiConnect) {
// 将accept 的fd 收入到自己的conn结构里面,之后负责这个conn的读写
std::shared_ptr<PinkConn> tc = conn_factory_->NewPinkConn(
ti.fd(), ti.ip_port(),
server_thread_, private_data_, pink_epoll_);
} else if (ti.notify_type() == kNotiEpollout) {
pink_epoll_->PinkModEvent(ti.fd(), 0, EPOLLOUT);
} else if (....) {
....
}
} else {
if ((pfe->mask & EPOLLOUT) && in_conn->is_reply()) {
WriteStatus write_status = in_conn->SendReply();
...
}
if (!should_close && (pfe->mask & EPOLLIN)) {
ReadStatus read_status = in_conn->GetRequest();
...
}
}
}
}
}
}
1,新建客户端连接时候Dispatch Thread调用HandleNewConn 向worker thread 传入fd消息,worker thread 负责调用conn_factory 中的NewPinkConn方法,按照使用者的实现方式新建一个connection负责后续的数据读写。
ConnFactory 是一个工厂类,用于创建连接,DispatchThread 的使用者必须自己实现其ConnFactory::NewPinkConn 函数。
/*
* for every conn, we need create a corresponding ConnFactory
*/
class ConnFactory {
public:
virtual ~ConnFactory() {}
virtual std::shared_ptr<PinkConn> NewPinkConn(
int connfd,
const std::string &ip_port,
Thread *thread,
void* worker_private_data, /* Has set in ThreadEnvHandle */
PinkEpoll* pink_epoll = nullptr) const = 0;
};
2,后续通过调用不同connection实现的SendReply 和GetRequest 进行读写操作。
Connection
PinkConn类是对客户端连接的抽象,控制客户端数据的读取,解析,以及结果的缓存,写回。不同协议的connection回有相应的自己实现,这里以RedisConn为例。
class PinkConn : public std::enable_shared_from_this<PinkConn> {
virtual ReadStatus GetRequest() = 0;
virtual WriteStatus SendReply() = 0;
int fd_;
std::string ip_port_;
bool is_reply_;
struct timeval last_interaction_;
int flags_;
}
PinkConn 当中包含了GetRequest SendReply两个纯虚函数,控制数据如何读取和写回。
class RedisConn: public PinkConn {
RedisConn(const int fd,
const std::string& ip_port,
...
const HandleType& handle_type = kSynchronous,
const int rbuf_max_len = REDIS_MAX_MESSAGE);
// serverthread 或者 worker thread 调用
virtual ReadStatus GetRequest() {
// read from socket
// forward to redis_parser_, redis_parser will invoke callback
// when one command is completely parsed
redis_parser_.ProcessInputBuffer();
}
virtual WriteStatus SendReply() {
// write response back
while() {
nwritten = write(fd(), response_.data() + wbuf_pos_,
wbuf_len - wbuf_pos_);
}
}
// RedisConn 使用者调用
int RedisConn::WriteResp(const std::string& resp) {
response_.append(resp);
set_is_reply(true);
return 0;
}
// kAsynchronous 调用接口
virtual void AsynProcessRedisCmds(
const std::vector<RedisCmdArgsType>& argvs, std::string* response);
void NotifyEpoll(bool success) {
// write to server thread or worker thread(conn holder) notify_send_fd
// if success true tell conn holder set fd kEpolloutAndEpollin
// if success false close conn holder this conn
}
//kSynchronous 调用接口
virtual int DealMessage(
const RedisCmdArgsType& argv, std::string* response) = 0;
RedisParser redis_parser_;
std::string response_;
}
RedisConn::RedisConn() {
// 初始化 将redis conn的ParserDealMessageCb ParserCompleteCb挂载到
// redis parser中,redis parser 解析出完整的一条redis 命令会调用相
// 应的ParserDealMessageCb 或者ParserCompleteCb 函数
RedisParserSettings settings;
settings.DealMessage = ParserDealMessageCb;
settings.Complete = ParserCompleteCb;
}
// 根据具体的RedisConn 的配置是否为异步模式 调用RedisConn的DealMessage
// 函数或者AsynProcessRedisCmds
int RedisConn::ParserDealMessageCb(
RedisParser* parser, const RedisCmdArgsType& argv) {
RedisConn* conn = reinterpret_cast<RedisConn*>(parser->data);
if (conn->GetHandleType() == HandleType::kSynchronous) {
return conn->DealMessage(argv, &(conn->response_));
} else {
return 0;
}
}
int RedisConn::ParserCompleteCb(
RedisParser* parser, const std::vector<RedisCmdArgsType>& argvs) {
RedisConn* conn = reinterpret_cast<RedisConn*>(parser->data);
if (conn->GetHandleType() == HandleType::kAsynchronous) {
conn->AsynProcessRedisCmds(argvs, &(conn->response_));
}
return 0;
}
RedisConn 中的redis_parser 结构用于解析redis 命令,每当解析完一条完整的redis 命令后回调相应的回调函数。
1,GetRequest 函数会不停的从socket读数据,放入redis_parser解析。redis_parser根据解析情况在合适的时候调用其DealMessage Complete回调。
2,SendReply 函数,会在合适的时机调用write 将response 缓存的返回数据写向客户端。
使用者需要继承RedisConn 实现类似MyRedisConn 的类,并且在其内部实现 DealMessage 函数或者AsynProcessRedisCmds。取决于当前使用的Conn的模式。
kAsynchronous模式下AsynProcessRedisCmds 的实现通常把解析好的Cmd 交付给另外的线程处理,例如T1,T1在处理完命令之后 ,调用WriteResp接口将结果写入RedisConn,再调用NotifyEpoll 将这个RedisConn 的EPOLLOUT监听打开,serverthread 或者worker thread的下一个epoll 循环 可以将结果写回。
kSynchronous 模式下DealMessage 的实现通常会直接处理命令调用WriteResp,返回worker thread 的ThreadMain 函数之后,在下一个epoll循环将结果写回。
Pika 线程模型以及Cmd处理顺序
Pika 的PikaDispatchThread 是DispatchThread 的定制实现,PikaClientConn是RedisConn 的定制实现。
class PikaDispatchThread {
class ClientConnFactory : public pink::ConnFactory {
virtual std::shared_ptr<pink::PinkConn> NewPinkConn() {
return std::static_pointer_cast<pink::PinkConn>
(std::make_shared<PikaClientConn>())
}
}
class Handles : public pink::ServerHandle {}
}
class PikaClientConn: public pink::RedisConn {
void AsynProcessRedisCmds(
const std::vector<pink::RedisCmdArgsType>& argvs,
std::string* response) override;
};
void PikaClientConn::AsynProcessRedisCmds() {
// schedule to thread pool
g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg);
}
// thread pool thead start processing cmd
void PikaClientConn::DoBackgroundTask(void* arg) {
if (error) {
// close conn
conn_ptr->NotifyEpoll(false);
}
conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds);
}
void PikaClientConn::BatchExecRedisCmd(
const std::vector<pink::RedisCmdArgsType>& argvs) {
// process cmd...
TryWriteResp();
}
void PikaClientConn::TryWriteResp() {
// write response to local resp
...
// notify worker thread open fd EPOLLOUT and ready to write back
NotifyEpoll(true);
}
1,新的客户端连接接入到DispatchThread,DispatchThread accept 并生成fd,传递到worker thread。
2,worker thread 调用ClientConnFactory 的NewPinkConn 生成PikaClientConn,从此维护此PikaClientConn的读写行为。
3,worker thread调用GetRequest读取客户端请求放入redis_parser 进行解析,redis_parser 调用PikaClientConn::AsynProcessRedisCmds实现。将此cmd 放入thread pool queue。
4,thread pool thread 处理cmd,调用PikaClientConn::WriteResp将处理结果写入PikaClientConn的resp 结构,调用PikaClientConn::NotifyEpoll 通知worker thread 可以返回客户端。
5,worker thread 接收kNotiEpollout事件,打开这个conn fd 的EPOLLOUT ,下一个epoll_wait 周期检测到这个conn可写,调用WriteResp将resp的内容返回给客户端。
Reference
https://github.com/Qihoo360/pika/tree/v3.3.4 https://github.com/Qihoo360/pink